文章摘要
加载中...|
此内容根据文章生成,并经过人工审核,仅用于文章内容的解释与总结 投诉

概述

随着任务复杂度的增加,单个 Agent 往往难以应对。多 Agent 系统通过让多个专门化的 Agent 协作,可以解决更复杂的问题。本文将深入探讨多 Agent 协作模式、规划机制和主流框架。

为什么需要多 Agent

单 Agent 的局限

text
┌─────────────────────────────────────────────────────────┐
│              单 Agent vs 多 Agent 对比                    │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  单 Agent:                                              │
│  • 需要掌握所有技能                                      │
│  • 难以并行处理                                          │
│  • 容易出错(牵一发而动全身)                             │
│  • 难以扩展                                              │
│  • 上下文窗口限制                                        │
│                                                         │
│  多 Agent:                                              │
│  • 专业化分工(每个 Agent 专注一个领域)                   │
│  • 并行执行任务                                          │
│  • 错误隔离(一个出错不影响其他)                         │
│  • 易于扩展                                              │
│  • 分布式处理                                            │
│                                                         │
└─────────────────────────────────────────────────────────┘

多 Agent 的优势

优势说明示例
专业化每个 Agent 专注于特定领域代码 Agent、写作 Agent、分析 Agent
并行化同时执行多个任务同时搜索多个信息源
容错性单个失败不影响整体一个搜索失败,其他继续
可扩展容易添加新能力添加新的专业 Agent
模块化独立开发和测试每个单元可单独优化

Agent 规划机制

规划的重要性

text
┌─────────────────────────────────────────────────────────┐
│                   Agent 规划流程                           │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  用户任务                                                │
│     │                                                    │
│     ▼                                                    │
│  ┌─────────────────────────────────────┐               │
│  │        1. 理解目标                   │               │
│  │   分析任务要求、约束条件             │               │
│  └─────────────────────────────────────┘               │
│     │                                                    │
│     ▼                                                    │
│  ┌─────────────────────────────────────┐               │
│  │        2. 任务分解                   │               │
│  │   将复杂任务分解为子任务             │               │
│  └─────────────────────────────────────┘               │
│     │                                                    │
│     ▼                                                    │
│  ┌─────────────────────────────────────┐               │
│  │        3. 分配资源                   │               │
│  │   决定哪些 Agent 执行哪些任务        │               │
│  └─────────────────────────────────────┘               │
│     │                                                    │
│     ▼                                                    │
│  ┌─────────────────────────────────────┐               │
│  │        4. 执行与监控                 │               │
│  │   执行任务,检查进度,处理错误       │               │
│  └─────────────────────────────────────┘               │
│     │                                                    │
│     ▼                                                    │
│  ┌─────────────────────────────────────┐               │
│  │        5. 结果整合                   │               │
│  │   合并各 Agent 结果,生成最终输出    │               │
│  └─────────────────────────────────────┘               │
│                                                         │
└─────────────────────────────────────────────────────────┘

任务分解策略

python
from typing import List, Dict
from pydantic import BaseModel

class SubTask(BaseModel):
    """子任务"""
    id: str
    description: str
    agent_type: str  # 需要什么类型的 Agent
    dependencies: List[str]  # 依赖的其他任务
    inputs: Dict  # 输入参数

class TaskPlanner:
    """任务规划器"""

    def __init__(self):
        self.planner_llm = ChatOpenAI(model="gpt-4o")

    def plan(self, task: str) -> List[SubTask]:
        """将任务分解为子任务"""
        prompt = f"""将以下任务分解为多个子任务。

任务: {task}

请考虑:
1. 需要哪些步骤?
2. 每个步骤需要什么类型的 Agent?
3. 任务之间的依赖关系?

返回JSON格式的子任务列表。"""

        response = self.planner_llm.invoke(prompt)

        # 解析并返回子任务
        # 简化示例
        return self._parse_plan(response.content)

    def _parse_plan(self, plan_text: str) -> List[SubTask]:
        """解析规划结果"""
        # 实际应用中应该使用结构化输出
        import json

        try:
            data = json.loads(plan_text)
            return [SubTask(**task) for task in data.get("tasks", [])]
        except:
            # 返回默认规划
            return [
                SubTask(
                    id="1",
                    description="分析任务",
                    agent_type="analyst",
                    dependencies=[],
                    inputs={"task": task}
                ),
                SubTask(
                    id="2",
                    description="执行任务",
                    agent_type="executor",
                    dependencies=["1"],
                    inputs={}
                )
            ]

# 使用
planner = TaskPlanner()
subtasks = planner.plan("帮我分析一下苹果公司最近的财报")

for task in subtasks:
    print(f"{task.id}: {task.description} (Agent: {task.agent_type})")

动态规划

python
class AdaptivePlanner:
    """自适应规划器"""

    def __init__(self):
        self.planner = ChatOpenAI(model="gpt-4o")

    def plan_and_execute(self, task: str, max_iterations: int = 5):
        """规划并执行,根据结果动态调整"""

        context = {"task": task, "iteration": 0}
        results = []

        for i in range(max_iterations):
            context["iteration"] = i

            # 1. 生成/更新计划
            plan = self._generate_plan(context)

            # 2. 执行当前步骤
            result = self._execute_step(plan, context)
            results.append(result)

            # 3. 检查是否完成
            if self._is_complete(result, context):
                break

            # 4. 更新上下文
            context["previous_results"] = results

        return self._combine_results(results)

    def _generate_plan(self, context: Dict) -> Dict:
        """生成或更新计划"""
        previous_results = context.get("previous_results", [])

        if not previous_results:
            # 首次规划
            prompt = f"""为以下任务制定执行计划:
任务: {context['task']}

返回当前应该执行的步骤。"""
        else:
            # 根据之前结果调整计划
            prompt = f"""基于之前的结果调整计划:

原始任务: {context['task']}
之前的结果: {previous_results}

返回下一步应该执行的步骤。"""

        response = self.planner.invoke(prompt)
        return self._parse_plan(response.content)

    def _execute_step(self, plan: Dict, context: Dict) -> Dict:
        """执行计划步骤"""
        # 根据计划调用相应的 Agent
        agent_type = plan.get("agent_type")
        agent = self._get_agent(agent_type)

        return agent.execute(plan.get("inputs", {}))

    def _is_complete(self, result: Dict, context: Dict) -> bool:
        """检查任务是否完成"""
        prompt = f"""判断以下任务是否已完成:

任务: {context['task']}
当前结果: {result}

返回 "yes" 或 "no"。"""

        response = self.planner.invoke(prompt)
        return "yes" in response.content.lower()

多 Agent 通信模式

通信模式分类

text
┌─────────────────────────────────────────────────────────┐
│                多 Agent 通信模式                          │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  1. 层次式 (Hierarchical)                               │
│     Manager Agent                                       │
│        ├── Worker Agent 1                               │
│        ├── Worker Agent 2                               │
│        └── Worker Agent 3                               │
│                                                         │
│  2. 平等式 (Flat)                                       │
│     Agent 1 ←→ Agent 2 ←→ Agent 3                       │
│                                                         │
│  3. 网络式 (Network)                                    │
│     ┌─────┐     ┌─────┐                                │
│     │ A1  │────▶│ A2  │                                │
│     └─────┘     └─────┘                                │
│        │           │                                    │
│        ▼           ▼                                    │
│     ┌─────┐     ┌─────┐                                │
│     │ A3  │────▶│ A4  │                                │
│     └─────┘     └─────┘                                │
│                                                         │
│  4. 议会式 (Consensus)                                  │
│     所有 Agent 讨论并投票                                │
│                                                         │
└─────────────────────────────────────────────────────────┘

层次式协作

python
from abc import ABC, abstractmethod
from typing import Any, Dict

class Agent(ABC):
    """Agent 基类"""

    def __init__(self, name: str):
        self.name = name

    @abstractmethod
    def execute(self, task: str, context: Dict = None) -> Dict:
        """执行任务"""
        pass

class ManagerAgent(Agent):
    """管理 Agent"""

    def __init__(self, name: str, workers: List[Agent]):
        super().__init__(name)
        self.workers = workers
        self.planner = ChatOpenAI(model="gpt-4o")

    def execute(self, task: str, context: Dict = None) -> Dict:
        """分配任务并协调执行"""
        context = context or {}

        # 1. 任务分解
        subtasks = self._decompose_task(task)

        # 2. 分配给 Worker
        worker_results = {}
        for subtask in subtasks:
            worker = self._find_worker(subtask["agent_type"])
            if worker:
                result = worker.execute(subtask["task"], context)
                worker_results[subtask["id"]] = result

        # 3. 整合结果
        return self._integrate_results(task, worker_results)

    def _decompose_task(self, task: str) -> List[Dict]:
        """分解任务"""
        prompt = f"""将任务分解并分配给合适的 Agent:

可用 Agent 类型:
- researcher: 研究信息
- writer: 撰写内容
- analyst: 分析数据
- coder: 编写代码

任务: {task}

返回子任务列表,包含 agent_type 和 task 字段。"""

        response = self.planner.invoke(prompt)
        # 简化:返回解析后的子任务
        return self._parse_subtasks(response.content)

    def _find_worker(self, agent_type: str) -> Agent:
        """找到合适的 Worker"""
        for worker in self.workers:
            if worker.agent_type == agent_type:
                return worker
        return None

    def _integrate_results(self, task: str, results: Dict) -> Dict:
        """整合结果"""
        prompt = f"""整合以下 Agent 的执行结果:

原始任务: {task}
执行结果: {json.dumps(results, ensure_ascii=False)}

生成最终输出。"""

        response = self.planner.invoke(prompt)
        return {"final_output": response.content, "details": results}

class WorkerAgent(Agent):
    """Worker Agent"""

    def __init__(self, name: str, agent_type: str):
        super().__init__(name)
        self.agent_type = agent_type
        self.llm = ChatOpenAI(model="gpt-4o")

    def execute(self, task: str, context: Dict = None) -> Dict:
        """执行分配的任务"""
        prompt = f"""你是 {self.agent_type} Agent。执行以下任务:

任务: {task}
上下文: {context or '无'}

返回执行结果。"""

        response = self.llm.invoke(prompt)
        return {
            "agent": self.name,
            "type": self.agent_type,
            "result": response.content
        }

# 使用
researcher = WorkerAgent("研究专家", "researcher")
writer = WorkerAgent("写作专家", "writer")
analyst = WorkerAgent("分析专家", "analyst")

manager = ManagerAgent("项目经理", [researcher, writer, analyst])

result = manager.execute("帮我写一份关于AI发展趋势的报告")
print(result["final_output"])

平等式协作

python
class FlatAgent(Agent):
    """平等 Agent,可以互相通信"""

    def __init__(self, name: str, agent_type: str, peers: List['FlatAgent'] = None):
        super().__init__(name)
        self.agent_type = agent_type
        self.peers = peers or []
        self.llm = ChatOpenAI(model="gpt-4o")
        self.message_queue = []

    def send_message(self, recipient: str, message: str):
        """发送消息给其他 Agent"""
        for peer in self.peers:
            if peer.name == recipient:
                peer.receive_message(self.name, message)
                return True
        return False

    def receive_message(self, sender: str, message: str):
        """接收消息"""
        self.message_queue.append({
            "from": sender,
            "message": message
        })

    def execute(self, task: str, context: Dict = None) -> Dict:
        """执行任务,可以与其他 Agent 通信"""
        # 处理收到的消息
        incoming_messages = self.message_queue.copy()
        self.message_queue.clear()

        # 构建提示
        prompt = f"""你是 {self.name},类型是 {self.agent_type}

任务: {task}
上下文: {context or '无'}

收到的消息:
{self._format_messages(incoming_messages)}

可以发送消息给其他 Agent: {[p.name for p in self.peers]}

执行任务,如果需要与其他 Agent 协作,说明要发送的消息。
格式:SEND <Agent名称> <消息内容>

返回结果。"""

        response = self.llm.invoke(prompt)
        result = response.content

        # 处理发送消息的指令
        self._process_send_commands(result)

        return {
            "agent": self.name,
            "result": result
        }

    def _format_messages(self, messages: List[Dict]) -> str:
        """格式化消息"""
        if not messages:
            return "无"
        return "\n".join([
            f"- {msg['from']}: {msg['message']}"
            for msg in messages
        ])

    def _process_send_commands(self, text: str):
        """处理发送消息指令"""
        import re
        pattern = r'SEND (\w+) (.+)'
        matches = re.findall(pattern, text)

        for recipient, message in matches:
            self.send_message(recipient, message)

class FlatMultiAgentSystem:
    """平等式多 Agent 系统"""

    def __init__(self):
        self.agents = []

    def add_agent(self, agent: FlatAgent):
        """添加 Agent"""
        self.agents.append(agent)
        # 更新所有 Agent 的 peers
        for a in self.agents:
            a.peers = [other for other in self.agents if other != a]

    def execute(self, task: str, max_rounds: int = 3) -> Dict:
        """执行任务,多轮协作"""
        results = []

        for round in range(max_rounds):
            print(f"\n=== 第 {round + 1} 轮 ===")

            round_results = {}
            for agent in self.agents:
                result = agent.execute(
                    task,
                    {"round": round, "previous_results": results}
                )
                round_results[agent.name] = result

            results.append(round_results)

            # 检查是否需要继续
            if self._should_stop(results):
                break

        return self._combine_results(results)

    def _should_stop(self, results: List[Dict]) -> bool:
        """判断是否应该停止"""
        # 简化:如果所有 Agent 都没有发送消息,认为完成
        last_round = results[-1]
        for agent in self.agents:
            if agent.message_queue:
                return False
        return True

# 使用
system = FlatMultiAgentSystem()

system.add_agent(FlatAgent("代码专家", "coder"))
system.add_agent(FlatAgent("测试专家", "tester"))
system.add_agent(FlatAgent("文档专家", "writer"))

result = system.execute("开发一个用户登录功能")
print(result)

主流多 Agent 框架

框架对比

框架语言特点适用场景
AutoGenPython微软开发,对话式通用多 Agent
CrewAIPython角色明确,流程清晰企业流程自动化
MetaGPTPython模拟软件公司软件开发
LangGraphPython状态机,可控制复杂工作流
AgentScopePython阿里开发,支持多模态多模态应用

AutoGen

python
# pip install pyautogen
import autogen

config_list = [
    {
        "model": "gpt-4o",
        "api_key": "your-api-key"
    }
]

# 定义 Agent
assistant = autogen.AssistantAgent(
    name="assistant",
    llm_config={
        "config_list": config_list,
        "temperature": 0
    }
)

user_proxy = autogen.UserProxyAgent(
    name="user_proxy",
    human_input_mode="NEVER",
    max_consecutive_auto_reply=10,
    code_execution_config={
        "work_dir": "coding",
        "use_docker": False
    }
)

# 创建对话
user_proxy.initiate_chat(
    assistant,
    message="帮我写一个Python快排算法"
)

CrewAI

python
# pip install crewai
from crewai import Agent, Task, Crew

# 定义 Agent
researcher = Agent(
    role="研究员",
    goal="研究最新技术趋势",
    backstory="你是一位经验丰富的技术研究员",
    llm=ChatOpenAI(model="gpt-4o"),
    verbose=True
)

writer = Agent(
    role="技术作家",
    goal="撰写清晰的技术文章",
    backstory="你擅长将复杂的技术概念解释清楚",
    llm=ChatOpenAI(model="gpt-4o"),
    verbose=True
)

# 定义任务
research_task = Task(
    description="研究 AI Agent 的发展趋势",
    agent=researcher
)

write_task = Task(
    description="根据研究结果撰写一篇技术文章",
    agent=writer,
    context=[research_task]  # 依赖研究任务
)

# 创建 Crew
crew = Crew(
    agents=[researcher, writer],
    tasks=[research_task, write_task],
    verbose=True
)

# 执行
result = crew.kickoff()
print(result)

LangGraph

python
# pip install langgraph
from langgraph.graph import StateGraph, END
from typing import TypedDict

class AgentState(TypedDict):
    """Agent 状态"""
    task: str
    research_result: str
    draft: str
    final_output: str

def research_node(state: AgentState):
    """研究节点"""
    researcher = ChatOpenAI(model="gpt-4o")
    result = researcher.invoke(f"研究: {state['task']}")
    return {"research_result": result.content}

def write_node(state: AgentState):
    """写作节点"""
    writer = ChatOpenAI(model="gpt-4o")
    result = writer.invoke(
        f"基于以下研究撰写文章: {state['research_result']}"
    )
    return {"draft": result.content}

def review_node(state: AgentState):
    """审核节点"""
    reviewer = ChatOpenAI(model="gpt-4o")
    result = reviewer.invoke(f"审核并改进: {state['draft']}")
    return {"final_output": result.content}

# 构建图
workflow = StateGraph(AgentState)

workflow.add_node("research", research_node)
workflow.add_node("write", write_node)
workflow.add_node("review", review_node)

workflow.set_entry_point("research")
workflow.add_edge("research", "write")
workflow.add_edge("write", "review")
workflow.add_edge("review", END)

# 编译
app = workflow.compile()

# 执行
result = app.invoke({"task": "写一篇关于 AI 的文章"})
print(result["final_output"])

实战:构建研究小组 Agent

python
from typing import List, Dict, Optional
import json
from dataclasses import dataclass

@dataclass
class Message:
    """Agent 之间的消息"""
    sender: str
    receiver: str
    content: str
    round: int

class ResearchGroupAgent:
    """研究小组 Agent"""

    def __init__(self, name: str, role: str, expertise: List[str]):
        self.name = name
        self.role = role
        self.expertise = expertise
        self.llm = ChatOpenAI(model="gpt-4o")
        self.memory = []  # 对话记忆
        self.findings = []  # 发现/贡献

    def receive(self, message: Message) -> Optional[Message]:
        """接收消息并处理"""
        self.memory.append(message)

        # 构建上下文
        context = self._build_context()

        # 决定是否需要回复
        decision = self._decide_response(message.content, context)

        if decision["should_respond"]:
            return Message(
                sender=self.name,
                receiver=message.sender,
                content=decision["response"],
                round=message.round + 1
            )

        return None

    def _build_context(self) -> str:
        """构建对话上下文"""
        recent = self.memory[-5:]  # 最近5条消息
        return "\n".join([
            f"{msg.sender}: {msg.content}"
            for msg in recent
        ])

    def _decide_response(self, input_msg: str, context: str) -> Dict:
        """决定是否回复以及回复内容"""
        prompt = f"""你是 {self.name},角色是 {self.role}
专业领域: {', '.join(self.expertise)}

对话历史:
{context}

新消息: {input_msg}

决定是否需要回复。如果需要:
1. 提供你的专业见解
2. 或提出相关问题
3. 或总结当前进展

返回JSON格式:
{{
    "should_respond": true/false,
    "response": "回复内容"
}}"""

        response = self.llm.invoke(prompt)

        try:
            return json.loads(response.content)
        except:
            return {"should_respond": False, "response": ""}

class ResearchGroup:
    """研究小组"""

    def __init__(self, topic: str):
        self.topic = topic
        self.agents = []
        self.messages = []
        self.round = 0

    def add_agent(self, agent: ResearchGroupAgent):
        """添加 Agent"""
        self.agents.append(agent)

    def discuss(self, max_rounds: int = 5) -> Dict:
        """进行讨论"""
        # 初始提示
        initial_msg = Message(
            sender="Moderator",
            receiver=self.agents[0].name,
            content=f"请开始讨论研究主题: {self.topic}。首先由 {self.agents[0].role} 发言。",
            round=0
        )

        self.messages.append(initial_msg)

        # 多轮讨论
        for self.round in range(max_rounds):
            print(f"\n=== 第 {self.round + 1} 轮讨论 ===")

            round_active = False

            # 收集本轮所有回复
            new_messages = []

            for agent in self.agents:
                # 找到发给该 Agent 的消息
                for msg in self.messages:
                    if msg.receiver == agent.name and msg.round == self.round:
                        response = agent.receive(msg)
                        if response:
                            new_messages.append(response)
                            round_active = True
                            print(f"{response.sender} -> {response.receiver}: {response.content[:50]}...")

            # 添加新消息
            self.messages.extend(new_messages)

            # 检查是否应该结束
            if not round_active:
                print("\n讨论结束")
                break

        # 生成总结
        return self._generate_summary()

    def _generate_summary(self) -> Dict:
        """生成讨论总结"""
        summary_llm = ChatOpenAI(model="gpt-4o")

        conversation = "\n".join([
            f"{msg.sender}: {msg.content}"
            for msg in self.messages
        ])

        prompt = f"""总结以下关于 "{self.topic}" 的讨论:

对话内容:
{conversation}

请提供:
1. 主要观点
2. 达成的共识
3. 存在的分歧
4. 建议的后续行动

返回JSON格式。"""

        response = summary_llm.invoke(prompt)

        try:
            return json.loads(response.content)
        except:
            return {"summary": response.content}

# 使用
group = ResearchGroup("如何提高 AI Agent 的可靠性")

group.add_agent(ResearchGroupAgent(
    "Alice",
    "AI研究员",
    ["AI架构", "大语言模型"]
))

group.add_agent(ResearchGroupAgent(
    "Bob",
    "软件工程师",
    ["软件测试", "代码质量"]
))

group.add_agent(ResearchGroupAgent(
    "Charlie",
    "产品经理",
    ["用户体验", "需求分析"]
))

# 开始讨论
summary = group.discuss(max_rounds=5)
print("\n=== 讨论总结 ===")
print(json.dumps(summary, ensure_ascii=False, indent=2))

小结

多 Agent 系统能够解决更复杂的任务:

核心要点

  1. 多 Agent 优势

    • 专业化分工
    • 并行执行
    • 错误隔离
    • 易于扩展
  2. 规划机制

    • 任务分解
    • 动态调整
    • 依赖管理
    • 进度监控
  3. 通信模式

    • 层次式:Manager 协调 Worker
    • 平等式:Agent 互相通信
    • 网络式:复杂拓扑结构
  4. 主流框架

    • AutoGen:微软的对话式框架
    • CrewAI:角色明确的流程框架
    • LangGraph:状态机驱动的可控框架
  5. 实践建议

    • 明确每个 Agent 的角色
    • 设计清晰的通信协议
    • 实现错误隔离和恢复
    • 记录和分析 Agent 行为

下一篇文章将介绍 OpenAI 落地实践。

赞赏博主
评论 隐私政策